// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importutilv2

import (
	"fmt"
	"math"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"

	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
	"github.com/milvus-io/milvus/pkg/v2/util/merr"
	"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
)

func TestOption_GetTimeout(t *testing.T) {
	const delta = 3 * time.Second

	options := []*commonpb.KeyValuePair{{Key: Timeout, Value: "300s"}}
	ts, err := GetTimeoutTs(options)
	assert.NoError(t, err)
	pt := tsoutil.PhysicalTime(ts)
	assert.WithinDuration(t, time.Now().Add(300*time.Second), pt, delta)

	options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "1.5h"}}
	ts, err = GetTimeoutTs(options)
	assert.NoError(t, err)
	pt = tsoutil.PhysicalTime(ts)
	assert.WithinDuration(t, time.Now().Add(90*time.Minute), pt, delta)

	options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "1h45m"}}
	ts, err = GetTimeoutTs(options)
	assert.NoError(t, err)
	pt = tsoutil.PhysicalTime(ts)
	assert.WithinDuration(t, time.Now().Add(105*time.Minute), pt, delta)

	options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "invalidTime"}}
	_, err = GetTimeoutTs(options)
	assert.Error(t, err)
}

func TestOption_ParseTimeRange(t *testing.T) {
	s, e, err := ParseTimeRange(nil)
	assert.NoError(t, err)
	assert.Equal(t, uint64(0), s)
	assert.Equal(t, uint64(math.MaxUint64), e)

	startTs := tsoutil.GetCurrentTime()
	options := []*commonpb.KeyValuePair{{Key: StartTs, Value: fmt.Sprintf("%d", startTs)}}
	s, e, err = ParseTimeRange(options)
	assert.NoError(t, err)
	assert.Equal(t, startTs, s)
	assert.Equal(t, uint64(math.MaxUint64), e)

	endTs := tsoutil.GetCurrentTime()
	options = []*commonpb.KeyValuePair{{Key: EndTs, Value: fmt.Sprintf("%d", endTs)}}
	s, e, err = ParseTimeRange(options)
	assert.NoError(t, err)
	assert.Equal(t, uint64(0), s)
	assert.Equal(t, endTs, e)

	options = []*commonpb.KeyValuePair{{Key: EndTs, Value: "&%#$%^&%^&$%^&&"}}
	_, _, err = ParseTimeRange(options)
	assert.ErrorIs(t, err, merr.ErrImportFailed)

	physicalTs := time.Now().UnixMilli()
	options = []*commonpb.KeyValuePair{{Key: EndTs, Value: fmt.Sprintf("%d", physicalTs)}}
	_, _, err = ParseTimeRange(options)
	assert.ErrorIs(t, err, merr.ErrImportFailed)

	options = []*commonpb.KeyValuePair{{Key: StartTs, Value: "0"}}
	_, _, err = ParseTimeRange(options)
	assert.ErrorIs(t, err, merr.ErrImportFailed)
}

func TestOption_SkipDiskQuotaCheck(t *testing.T) {
	// Neither backup nor l0_import, should return false
	options := []*commonpb.KeyValuePair{}
	assert.False(t, SkipDiskQuotaCheck(options))

	// backup = true, skip_disk_quota_check = true
	options = []*commonpb.KeyValuePair{
		{Key: BackupFlag, Value: "true"},
		{Key: SkipDQC, Value: "true"},
	}
	assert.True(t, SkipDiskQuotaCheck(options))

	// backup = true, skip_disk_quota_check = false
	options = []*commonpb.KeyValuePair{
		{Key: BackupFlag, Value: "true"},
		{Key: SkipDQC, Value: "false"},
	}
	assert.False(t, SkipDiskQuotaCheck(options))

	// l0_import = true, skip_disk_quota_check = true
	options = []*commonpb.KeyValuePair{
		{Key: L0Import, Value: "true"},
		{Key: SkipDQC, Value: "true"},
	}
	assert.True(t, SkipDiskQuotaCheck(options))

	// l0_import = true, skip_disk_quota_check = false
	options = []*commonpb.KeyValuePair{
		{Key: L0Import, Value: "true"},
		{Key: SkipDQC, Value: "false"},
	}
	assert.False(t, SkipDiskQuotaCheck(options))

	// backup = false, l0_import = true, skip_disk_quota_check = true
	options = []*commonpb.KeyValuePair{
		{Key: BackupFlag, Value: "false"},
		{Key: L0Import, Value: "true"},
		{Key: SkipDQC, Value: "true"},
	}
	assert.True(t, SkipDiskQuotaCheck(options))

	// backup = true, l0_import = false, skip_disk_quota_check = true
	options = []*commonpb.KeyValuePair{
		{Key: BackupFlag, Value: "true"},
		{Key: L0Import, Value: "false"},
		{Key: SkipDQC, Value: "true"},
	}
	assert.True(t, SkipDiskQuotaCheck(options))

	// backup = false, l0_import = false, skip_disk_quota_check = true
	options = []*commonpb.KeyValuePair{
		{Key: BackupFlag, Value: "false"},
		{Key: L0Import, Value: "false"},
		{Key: SkipDQC, Value: "true"},
	}
	assert.False(t, SkipDiskQuotaCheck(options))

	// backup = true, l0_import = true, skip_disk_quota_check = true
	options = []*commonpb.KeyValuePair{
		{Key: BackupFlag, Value: "true"},
		{Key: L0Import, Value: "true"},
		{Key: SkipDQC, Value: "true"},
	}
	assert.True(t, SkipDiskQuotaCheck(options))
}

func TestOption_GetCSVSep(t *testing.T) {
	options := []*commonpb.KeyValuePair{}
	r, err := GetCSVSep(options)
	assert.NoError(t, err)
	assert.Equal(t, ',', r)

	options = []*commonpb.KeyValuePair{
		{Key: CSVSep, Value: "|"},
	}
	r, err = GetCSVSep(options)
	assert.NoError(t, err)
	assert.Equal(t, '|', r)

	unsupportedSep := []rune{0, '\n', '\r', '"', 0xFFFD}
	for _, sep := range unsupportedSep {
		options = []*commonpb.KeyValuePair{
			{Key: CSVSep, Value: string(sep)},
		}
		_, err = GetCSVSep(options)
		assert.Error(t, err)
	}
}

func TestOption_GetCSVNullKey(t *testing.T) {
	options := []*commonpb.KeyValuePair{}
	nullKey, err := GetCSVNullKey(options)
	assert.NoError(t, err)
	assert.Equal(t, "", nullKey)

	options = []*commonpb.KeyValuePair{
		{Key: CSVNullKey, Value: "ABC"},
	}
	nullKey, err = GetCSVNullKey(options)
	assert.NoError(t, err)
	assert.Equal(t, "ABC", nullKey)
}

func TestOption_GetStorageVersion(t *testing.T) {
	// Test case 1: No storage_version option set, should return StorageV1 by default
	options := []*commonpb.KeyValuePair{}
	version, err := GetStorageVersion(options)
	assert.NoError(t, err)
	assert.Equal(t, int64(0), version) // StorageV1 = 0

	// Test case 2: storage_version set to "2", should return StorageV2
	options = []*commonpb.KeyValuePair{
		{Key: StorageVersion, Value: "2"},
	}
	version, err = GetStorageVersion(options)
	assert.NoError(t, err)
	assert.Equal(t, int64(2), version) // StorageV2 = 2
}

func TestSimple(t *testing.T) {
	// Simple test to verify the test environment works
	assert.Equal(t, 1, 1)
	assert.Equal(t, "test", "test")
}
